{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## maggy - MNIST Example\n",
    "---\n",
    "Updated: 02/07/2019"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook illustrates the usage of the maggy framework for asynchronous hyperparameter optimization on the fashion MNIST dataset.  \n",
    "\n",
    "In this specific example we are using random search over three parameters and we are deploying the median early stopping rule in order to make use of the asynchrony of the framework. The Median Stopping Rule implements the simple strategy of stopping a trial if its performance falls below the median of other trials at similar points in time.\n",
    "\n",
    "We are using Keras for this example. This notebook works with any Spark cluster given that you are using maggy 0.1. In future versions we will add functionality that relies on Hopsworks.\n",
    "\n",
    "This notebook has been tested with TensorFlow 1.11.0 and Spark 2.4.0.  \n",
    "Requires Python 3.6 or higher."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1. Spark Session"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Make sure you have a running Spark Session/Context available. On Hopsworks just execute a simple command to start the spark application."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Hello World!\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2. Searchspace definition\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We want to conduct random search for the MNIST example on three hyperparameters: Kernel size, pooling size and dropout rate. Hence, we have two continuous integer valued parameters and one double valued parameter."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from maggy import Searchspace\n",
    "\n",
    "# The searchspace can be instantiated with parameters\n",
    "sp = Searchspace(kernel=('INTEGER', [2, 8]), pool=('INTEGER', [2, 8]))\n",
    "\n",
    "# Or additional parameters can be added one by one\n",
    "sp.add('dropout', ('DOUBLE', [0.01, 0.99]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3. Model training definition\n",
    "\n",
    "The programming model is that you wrap the code containing the model training inside a wrapper function. Inside that wrapper function provide all imports and parts that make up your experiment.\n",
    "\n",
    "There are several requirements for this wrapper function:\n",
    "\n",
    "1. The function should take the hyperparameters as arguments, plus one additional parameter `reporter` which is needed for reporting the current metric to the experiment driver.\n",
    "2. The function should return the metric that you want to optimize for. This should coincide with the metric being reported in the Keras callback (see next point).\n",
    "3. In order to leverage on the early stopping capabilities of maggy, you need to make use of the maggy reporter API. By including the reporter in your training loop, you are telling maggy which metric to report back to the experiment driver for optimization and to check for early stopping. It is as easy as adding `reporter.broadcast(metric=YOUR_METRIC)` for example at the end of your epoch or batch training step and adding a `reporter` argument to your function signature. If you are not writing your own training loop you can use the pre-written Keras callbacks:\n",
    "    - KerasBatchEnd\n",
    "    - KerasEpochEnd  \n",
    "(Please see documentation for a detailed explanation.)\n",
    "\n",
    "We are going to use the `KerasBatchEnd` callback to report back the accuracy after each batch. However, note that in the BatchEnd callback we have only access to training accuracy since validation after each batch would be too expensive.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from maggy import experiment\n",
    "from maggy.callbacks import KerasBatchEnd"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Definition of the training wrapper function:\n",
    "(maggy specific parts are highlighted with comments and correspond to the three points described above.)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#########\n",
    "### maggy: hyperparameters as arguments and including the reporter\n",
    "#########\n",
    "def keras(kernel, pool, dropout, reporter):\n",
    "    from tensorflow import keras\n",
    "    import tensorflow as tf\n",
    "    from tensorflow.keras.datasets import mnist\n",
    "    from tensorflow.keras.models import Sequential\n",
    "    from tensorflow.keras.layers import Dense, Dropout, Flatten\n",
    "    from tensorflow.keras.layers import Conv2D, MaxPooling2D\n",
    "    from tensorflow.keras.callbacks import TensorBoard\n",
    "\n",
    "    from tensorflow.keras import backend as K\n",
    "    import math\n",
    "    \n",
    "    from hops import tensorboard\n",
    "    from hops import hdfs\n",
    "\n",
    "    batch_size = 500\n",
    "    num_classes = 10\n",
    "    epochs = 10\n",
    "\n",
    "    # Input image dimensions\n",
    "    img_rows, img_cols = 28, 28\n",
    "    \n",
    "    train_filenames = [hdfs.project_path() + \"TourData/mnist/train/train.tfrecords\"]\n",
    "    validation_filenames = [hdfs.project_path() + \"TourData/mnist/validation/validation.tfrecords\"]\n",
    "    \n",
    "    # Determine the number of records in the TFRecords\n",
    "    num_train = 0\n",
    "    for fn in train_filenames:\n",
    "        for record in tf.python_io.tf_record_iterator(fn):\n",
    "            num_train += 1\n",
    "    num_val = 0\n",
    "    for fn in validation_filenames:\n",
    "        for record in tf.python_io.tf_record_iterator(fn):\n",
    "            num_val += 1\n",
    "    \n",
    "    # Create an iterator over the dataset\n",
    "    def data_input_it(filenames, batch_size=128, shuffle=False, repeat=None):\n",
    "\n",
    "        def parser(serialized_example):\n",
    "            \"\"\"Parses a single tf.Example into image and label tensors.\"\"\"\n",
    "            features = tf.parse_single_example(\n",
    "                serialized_example,\n",
    "                features={\n",
    "                    'image_raw': tf.FixedLenFeature([], tf.string),\n",
    "                    'label': tf.FixedLenFeature([], tf.int64),\n",
    "                })\n",
    "            image = tf.decode_raw(features['image_raw'], tf.uint8)\n",
    "            image.set_shape([28 * 28])\n",
    "\n",
    "            # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]\n",
    "            image = tf.cast(image, tf.float32) / 255 - 0.5\n",
    "            label = tf.cast(features['label'], tf.int32)\n",
    "            # Reshape the tensor\n",
    "            image = tf.reshape(image, [img_rows, img_cols, 1])\n",
    "    \n",
    "            # Create a one hot array for your labels\n",
    "            label = tf.one_hot(label, num_classes)\n",
    "            \n",
    "            \n",
    "            return image, label\n",
    "\n",
    "\n",
    "        # Import MNIST data\n",
    "        dataset = tf.data.TFRecordDataset(filenames)\n",
    "\n",
    "        # Map the parser over dataset, and batch results by up to batch_size\n",
    "        dataset = dataset.map(parser)\n",
    "        if shuffle:\n",
    "            dataset = dataset.shuffle(buffer_size=128)\n",
    "        dataset = dataset.batch(batch_size)\n",
    "        dataset = dataset.repeat(repeat)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "\n",
    "        return iterator\n",
    "    \n",
    "    input_shape = (28, 28, 1)\n",
    "\n",
    "    model = Sequential()\n",
    "    model.add(Conv2D(32, kernel_size=(kernel, kernel),\n",
    "                     activation='relu',\n",
    "                     input_shape=input_shape))\n",
    "    model.add(Conv2D(64, (kernel, kernel), activation='relu'))\n",
    "    model.add(MaxPooling2D(pool_size=(pool, pool)))\n",
    "    model.add(Dropout(dropout))\n",
    "    model.add(Flatten())\n",
    "    model.add(Dense(128, activation='relu'))\n",
    "    model.add(Dropout(dropout))\n",
    "    model.add(Dense(num_classes, activation='softmax'))\n",
    "\n",
    "    opt = keras.optimizers.Adadelta(1.0)\n",
    "\n",
    "    model.compile(loss=keras.losses.categorical_crossentropy,\n",
    "                  optimizer=opt,\n",
    "                  metrics=['accuracy'])\n",
    "    \n",
    "    # setup tensorboard\n",
    "    tb_callback = TensorBoard(log_dir=tensorboard.logdir(), histogram_freq=0,write_graph=True, write_images=True)\n",
    "    \n",
    "    #########\n",
    "    ### maggy: REPORTER API through keras callback\n",
    "    #########\n",
    "    callbacks = [KerasBatchEnd(reporter, metric='acc')]\n",
    "    \n",
    "    # Initialize the iterators\n",
    "    train_input_it = data_input_it(train_filenames[0], batch_size=batch_size)\n",
    "    eval_input_it = data_input_it(validation_filenames[0], batch_size=batch_size)\n",
    "\n",
    "    model.fit(train_input_it,\n",
    "              steps_per_epoch = num_train//batch_size,\n",
    "              callbacks=callbacks, # add callback\n",
    "              epochs=epochs,\n",
    "              verbose=1,\n",
    "              validation_data=eval_input_it,\n",
    "              validation_steps=num_val//batch_size)\n",
    "    score = model.evaluate(eval_input_it, steps=num_val//batch_size, verbose=1)\n",
    "    \n",
    "    # Using print in the wrapper function will print underneath the Jupyter Cell with a \n",
    "    # prefix to indicate which prints come from the same executor\n",
    "    \n",
    "    print('Test loss:', score[0])\n",
    "    print('Test accuracy:', score[1])\n",
    "    \n",
    "    #########\n",
    "    ### maggy: return the metric to be optimized, test accuracy in this case\n",
    "    #########\n",
    "    return score[1]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4. Launching the experiment\n",
    "\n",
    "Finally, we are ready to launch the maggy experiment.\n",
    "There are a variety of parameters to specify, some of which are optional:\n",
    "1. `map_fun`: your previously specified training wrapper function\n",
    "2. `searchspace`: the searchspace object\n",
    "3. `optimizer`: the optimization algorithm to be used (only 'randomsearch' available at the moment)\n",
    "4. `direction`: maximize or minimize the specified metric\n",
    "5. `num_trials`: number of different parameter combinations to be evaluated\n",
    "6. `name`: an experiment name\n",
    "7. `hb_interval`: Time in seconds between the heartbeat messages with the metric to the experiment driver. A sensible value is not much smaller than the frequency in which your training loop updates the metric. So using the KerasBatchEnd reporter callback, it does not make sense having a much smaller interval than the amount of time a batch takes to be processed.\n",
    "8. `es_interval`: Interval in seconds, specifying how often the currently running trials should be checked for early stopping. Should be bigger than the `hb_interval`.\n",
    "9. `es_min`: Minimum number of trials to be finished before starting to check for early stopping. For example, the median stopping rule implements the simple strategy of stopping a trial if its performance falls below the median of finished trials at similar points in time. We only want to start comparing to the median once there are several trials finished."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result = experiment.lagom(map_fun=keras, \n",
    "                           searchspace=sp, \n",
    "                           optimizer='randomsearch', \n",
    "                           direction='max',\n",
    "                           num_trials=15, \n",
    "                           name='MNIST', \n",
    "                           hb_interval=5, \n",
    "                           es_interval=5,\n",
    "                           es_min=5\n",
    "                          )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To observe the progress, you can check the sterr of the spark executors. TensorBoard support is added in the coming version."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}